{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "e4a4ffd8-8aa5-4b6e-b60a-f4ef14049c46",
   "metadata": {},
   "source": [
    "## Druid 26.0 release notebook"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "3a008975-3100-417b-8ddc-623857d5ad6a",
   "metadata": {
    "tags": []
   },
   "source": [
    "<!--\n",
    "  ~ Licensed to the Apache Software Foundation (ASF) under one\n",
    "  ~ or more contributor license agreements.  See the NOTICE file\n",
    "  ~ distributed with this work for additional information\n",
    "  ~ regarding copyright ownership.  The ASF licenses this file\n",
    "  ~ to you under the Apache License, Version 2.0 (the\n",
    "  ~ \"License\"); you may not use this file except in compliance\n",
    "  ~ with the License.  You may obtain a copy of the License at\n",
    "  ~\n",
    "  ~   http://www.apache.org/licenses/LICENSE-2.0\n",
    "  ~\n",
    "  ~ Unless required by applicable law or agreed to in writing,\n",
    "  ~ software distributed under the License is distributed on an\n",
    "  ~ \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
    "  ~ KIND, either express or implied.  See the License for the\n",
    "  ~ specific language governing permissions and limitations\n",
    "  ~ under the License.\n",
    "  -->\n",
    "  \n",
    "This notebook highlights some of the new features released in Druid 26.0.\n",
    "\n",
    "Before you begin, ensure you have the following:\n",
    "* The `pandas` Python package\n",
    "* The `requests` Python package\n",
    "* A running Druid instance.\n",
    "* Jupyter Lab or Jupyter Notebook running on a non-default port. By default, Druid and Jupyter both try to use port 8888, so start Jupyter on a different port. For more information on using Jupyter notebooks with Druid, see [Jupyter Notebook tutorials](https://druid.apache.org/docs/latest/tutorials/tutorial-jupyter-index.html).\n",
    "\n",
    "## Features\n",
    "* [Schema auto-discovery](#Schema-auto-discovery)\n",
    "* [Shuffle join](#Shuffle-join)\n",
    "* [UNNEST and arrays](#UNNEST-and-arrays)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "f02a76ed-8600-4afa-a37e-c3519005e2ab",
   "metadata": {},
   "source": [
    "## Verify Druid version"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "18cc6a82-0167-423c-b14d-01c36ac2733d",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "import requests\n",
    "\n",
    "druid_host = \"http://localhost:8888\"\n",
    "session = requests.Session()\n",
    "endpoint = druid_host + '/status'\n",
    "response = session.get(endpoint)\n",
    "json = response.json()\n",
    "print(\"Running on Druid version: \"+ json[\"version\"])"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "c39b6caf-e08a-41c0-9021-12ee270023c1",
   "metadata": {
    "tags": []
   },
   "source": [
    "## Schema auto-discovery\n",
    "\n",
    "### What would happen in the past if we just load this data?\n",
    "\n",
    "Previously, Druid already supports [string-based schema auto-discovery](https://druid.apache.org/docs/latest/ingestion/schema-design.html#string-based-schema-discovery), but it has some limitations. Specifically, all the newly discovered columns will be stored as string types. This means aggregation queries on numerical columns can be slow (since they need to be parsed as numbers first), and some fields such as multi-value dimensions with null values can misbehave.\n",
    "\n",
    "With the introduction of [type-aware schema auto-discovery](https://druid.apache.org/docs/latest/ingestion/schema-design.html#type-aware-schema-discovery), Druid now properly infers data types. Set this in an ingestion job by including `\"useSchemaDiscovery\": True` in the `dimensionsSpec` object. In the example below, you perform a batch ingestion job and instruct Druid to automatically infer the input data types as long, float, string, etc. Run the following cell, then go to the [web console](http://localhost:8888/unified-console.html#ingestion) to check the progress of your ingestion task."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ee16e5bc-7e7a-4da5-9816-99d161100522",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "import json\n",
    "from IPython.display import JSON\n",
    "ingestion_spec = {\n",
    "  \"type\": \"index_parallel\",\n",
    "  \"spec\": {\n",
    "    \"ioConfig\": {\n",
    "      \"type\": \"index_parallel\",\n",
    "      \"inputSource\": {\n",
    "        \"type\": \"http\",\n",
    "        \"uris\": [\"https://druid.apache.org/data/wikipedia.json.gz\"],\n",
    "        \"filter\": \"*\"\n",
    "      },\n",
    "      \"inputFormat\": {\n",
    "        \"type\": \"json\"\n",
    "      }\n",
    "    },\n",
    "    \"tuningConfig\": {\n",
    "      \"type\": \"index_parallel\",\n",
    "      \"partitionsSpec\": {\n",
    "        \"type\": \"dynamic\"\n",
    "      },\n",
    "      \"indexSpec\": {\n",
    "        \"stringDictionaryEncoding\": {\n",
    "          \"type\": \"frontCoded\",\n",
    "          \"bucketSize\": 16\n",
    "        }\n",
    "      }\n",
    "    },\n",
    "    \"dataSchema\": {\n",
    "      \"dataSource\": \"wikipedia\",\n",
    "      \"timestampSpec\": {\n",
    "        \"missingValue\": \"2010-01-01T00:00:00Z\"\n",
    "      },\n",
    "      \"dimensionsSpec\": {\n",
    "        \"dimensions\": [],\n",
    "        \"dimensionExclusions\": [],\n",
    "        \"spatialDimensions\": [],\n",
    "        \"useSchemaDiscovery\": True\n",
    "      },\n",
    "      \"granularitySpec\": {\n",
    "        \"queryGranularity\": \"none\",\n",
    "        \"rollup\": False\n",
    "      }\n",
    "    }\n",
    "  }\n",
    "}\n",
    "\n",
    "JSON(ingestion_spec,expanded=True)\n",
    "\n",
    "endpoint = druid_host + '/druid/indexer/v1/task/'\n",
    "response = session.post(endpoint,json = ingestion_spec)\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "2617af1b",
   "metadata": {},
   "source": [
    "Note that because we've set `\"useSchemaDiscovery\": True` in the ingestion spec, even though we didn't specify any data types for the columns, they are correctly inferred. The following cell queries the information schema metadata table and displays the data types of the columns in the `wikipedia` table you just ingested."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7d3bc513-8215-4299-9bf4-135ec65cae98",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "endpoint = druid_host + '/druid/v2/sql'\n",
    "sql = '''\n",
    "SELECT *\n",
    "FROM \"INFORMATION_SCHEMA\".\"COLUMNS\"\n",
    "WHERE  \"TABLE_NAME\" = 'wikipedia'\n",
    "'''\n",
    "sql_request = {'query': sql}\n",
    "json_data = session.post(endpoint, json=sql_request).json()\n",
    "result_df = pd.json_normalize(json_data)\n",
    "result_df.head()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "483c67d7",
   "metadata": {},
   "source": [
    "As you can see, in the `DATA_TYPE` column, different data types are correctly detected. With string-based schema auto-discovery, Druid would have stored the data as `string` types."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "08a3b808-e138-47c7-b7f1-e3a6c9f3bad3",
   "metadata": {},
   "source": [
    "## Shuffle join\n",
    "\n",
    "### Make it really easy to denormalize data as part of ingestion\n",
    "Before the support of shuffle join, you'll need to use another tool to prepare the data then ingest into Druid. With shuffle join support, you can do the same transformation with one query.\n",
    "For example, in the query below, the user does a self-join on the wikipedia dataset. You can easily do the same query with a typical star-schema dataset. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0dc81a51-0160-4cd6-bd97-6abf60a6e7d6",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "query = '''\n",
    "REPLACE INTO \"wikipedia\" OVERWRITE ALL\n",
    "WITH \"wikipedia_main\" AS (SELECT *\n",
    "FROM TABLE(\n",
    "  EXTERN(\n",
    "    '{\"type\":\"http\",\"uris\":[\"https://druid.apache.org/data/wikipedia.json.gz\"]}',\n",
    "    '{\"type\":\"json\"}'\n",
    "  )\n",
    ") EXTEND (\"channel\" VARCHAR, \"timestamp\" VARCHAR,\"user\" VARCHAR))\n",
    ",\n",
    "\"wikipedia_dim\" AS (SELECT *\n",
    "FROM TABLE(\n",
    "  EXTERN(\n",
    "    '{\"type\":\"http\",\"uris\":[\"https://druid.apache.org/data/wikipedia.json.gz\"]}',\n",
    "    '{\"type\":\"json\"}'\n",
    "  )\n",
    ") EXTEND (\"timestamp\" VARCHAR,\"user\" VARCHAR,\"comment\" VARCHAR, \"commentLength\" BIGINT, \"cityName\" VARCHAR, \"countryName\" VARCHAR))\n",
    "\n",
    "\n",
    "SELECT\n",
    "  TIME_PARSE(\"wikipedia_main\".\"timestamp\") AS \"__time\",\n",
    "  \"wikipedia_main\".*,\n",
    "  \"wikipedia_dim\".*\n",
    "FROM \"wikipedia_main\"\n",
    "LEFT JOIN \"wikipedia_dim\" \n",
    "ON \n",
    "\"wikipedia_main\".\"user\" = \"wikipedia_dim\".\"user\"\n",
    "AND \n",
    "\"wikipedia_main\".\"timestamp\" = \"wikipedia_dim\".\"timestamp\"\n",
    "\n",
    "PARTITIONED BY MONTH\n",
    "'''"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "e10df053-2729-4e2c-ac4a-3c8d0c070dc0",
   "metadata": {},
   "source": [
    "### Let's watch the ingestion task run...\n",
    "Submit the preceding query and monitor the ingestion job by running the following cells. This may take a while. You can check the status of the ingestion task in the [web console](http://localhost:8888/unified-console.html#ingestion)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9d302e43-9f14-4d19-b286-7a3cbc448470",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "# This block submits the ingestion query\n",
    "sql_request={'query': query}\n",
    "endpoint = druid_host + '/druid/v2/sql/task'\n",
    "response = session.post(endpoint, json=sql_request)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "eadf05f7-bc0a-4a29-981d-d8bc5fd72314",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "# This block monitors the ingestion query (Takes about 25-35 seconds)\n",
    "ingestion_taskId = response.json()['taskId']\n",
    "endpoint = druid_host + f\"/druid/indexer/v1/task/{ingestion_taskId}/status\"\n",
    "import time\n",
    "\n",
    "json = session.get(endpoint).json()\n",
    "ingestion_status = json['status']['status']\n",
    " \n",
    "print(\"The ingestion is running...\")\n",
    "\n",
    "while ingestion_status == \"RUNNING\":\n",
    "    time.sleep(1)\n",
    "    json = session.get(endpoint).json()\n",
    "    ingestion_status = json['status']['status']\n",
    "    print('.', end='')\n",
    "\n",
    "if ingestion_status == \"SUCCESS\": \n",
    "    print(\"\\nThe ingestion is complete\")\n",
    "else:\n",
    "    print(\"\\nThe ingestion task failed:\", json)\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "10417469-b2f7-4a56-bd4f-fddc0277c3c9",
   "metadata": {},
   "source": [
    "### Note I didn't use any other tools, this is all done within Druid. No need for using Spark/Presto for data prep"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "7b134ef2-e3ef-4345-94c8-64cf36f6adfe",
   "metadata": {
    "tags": []
   },
   "source": [
    "## UNNEST and arrays\n",
    "\n",
    "UNNEST is useful to deal with Array data and allows you to \"explode\" an array into individual rows.\n",
    "\n",
    "In this example, we are looking at an array of tags, which includes `almond`, `blue_berry` and `muffin`. We can use UNNEST to explode the array into individual rows, and then perform a GROUP BY on the tags."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "434602dd-d62b-476f-b18f-4a3fa23ff70e",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "endpoint = druid_host + '/druid/v2/sql'\n",
    "sql = '''\n",
    "SELECT 'post_id_123' AS \"POST_ID\", ARRAY['almond','blue_berry','muffin'] as \"Tags\"\n",
    "'''\n",
    "sql_request = {'query': sql}\n",
    "json_data = session.post(endpoint, json=sql_request).json()\n",
    "result_df = pd.json_normalize(json_data)\n",
    "result_df.head()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "c5d8e9a1-194a-4fc8-9759-863672271565",
   "metadata": {},
   "source": [
    "For more examples and details on UNNEST, see [Unnest arrays within a column](https://druid.apache.org/docs/latest/tutorials/tutorial-unnest-arrays.html)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3b7d80ad-e7a0-4e4b-a926-177112dc9c93",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "endpoint = druid_host + '/druid/v2/sql'\n",
    "sql = '''SELECT 'post_id_123' as \"POST_ID\", * FROM UNNEST(ARRAY['almond','blue_berry','muffin']) \n",
    "'''\n",
    "sql_request = {'query': sql, 'context':{'enableUnnest': 'true'}}\n",
    "json_data = session.post(endpoint, json=sql_request).json()\n",
    "JSON(json_data)\n",
    "result_df = pd.json_normalize(json_data)\n",
    "result_df.head()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "9fdf81d2",
   "metadata": {},
   "source": [
    "Well, you've made it this far, try out some of the new features and let us know what you think!"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
